package com.xdl.apitest.tabletest

import com.xdl.apitest.SensorReading
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.{DataTypes, Table}
import org.apache.flink.table.api.scala._
import org.apache.flink.table.descriptors.{Csv, FileSystem, Schema}

/**
 * Project: FlinkTutorial
 * Package: com.xdl.apitest
 * Version: 1.0
 *
 * Created by guoxiaolong on 2020-08-01-15:21
 */
object FsOutputTest {
  def main(args: Array[String]): Unit = {
    //0.创建流执行环境，读取数据并转换成样例类
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    // 1.创建表执行环境
    val tableEnv = StreamTableEnvironment.create(env)

    // 2. 读取数据转换成流， map 成样例类
    val filePath: String = "D:\\Projects\\BigData\\FlinkTutorial\\src\\main\\resources\\sensor.txt"
    val inputStream: DataStream[String] = env.readTextFile(filePath)

    //map 成样例类型
    val dataStream: DataStream[SensorReading] = inputStream
      .map(data => {
        val dataArray = data.split(",")
        SensorReading(dataArray(0), dataArray(1).toLong, dataArray(2).toDouble)
      })

    //3. 把流转换成表
    val sensorTable: Table = tableEnv.fromDataStream(dataStream, 'id, 'temperature as 'tmp, 'timestamp as 'ts)

    //4. 表的转换操作
    //4.1 简单查询转换
    val resultTable: Table = sensorTable 
      .select("id, temp")
      .filter('id === "snesor_1")

    //4.2 聚合转换
    val aggResultTable: Table = sensorTable
      .groupBy('id)
      .select('id, 'id.count as 'count)

    //5. 将结果表输出到文件中
    tableEnv.connect(new FileSystem().path("D:\\Projects\\BigData\\FlinkTutorial\\src\\main\\resources\\output.txt"))
      .withFormat(new Csv())
      .withScahema(new Schema()
        .field("id", DataTypes.STRING())
        .field("temp", DataTypes.DOUBLE())
      )
      .createTemporaryTable("outputTable") //在表环境注册一张表
    resultTable.insertInto("outputTable")

    env.execute("fs output test job")


  }

}
